package rocketmq

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"go.uber.org/zap"
	"niu-ren/component/ck_log"
	"strings"
)

type CKProducer struct {
	Host     string
	Topic    string
	Producer rocketmq.Producer
}

func NewCKProducer(ctx context.Context, host string, topic string, groupName string, instance string) *CKProducer {
	hosts := strings.Split(host, ";")
	p, _ := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver(hosts)),
		producer.WithRetry(2),
		producer.WithGroupName(groupName),
		producer.WithInstanceName(instance),
	)
	err := p.Start()
	if err != nil {
		ck_log.LogCtx(ctx).Error(err)
		fmt.Printf("start producer error: %s", err.Error())
	}

	return &CKProducer{
		Host:     host,
		Topic:    topic,
		Producer: p,
	}
}

// SendWithTag 同步模式推送消息到队列
func (p *CKProducer) SendWithTag(ctx context.Context, data []byte, tag string) {

	msg := primitive.NewMessage(p.Topic, data).WithTag(tag)
	_, err := p.Producer.SendSync(context.Background(), msg)
	if err != nil {
		ck_log.LogCtx(ctx).Errorw("send message error", zap.Error(err))
	}
}

// SendWithTagAsync 异步模式推送, 原则上效率更高
func (p *CKProducer) SendWithTagAsync(ctx context.Context, data []byte, tag string) {
	msg := primitive.NewMessage(p.Topic, data).WithTag(tag)
	err := p.Producer.SendAsync(context.Background(),
		func(ctx context.Context, result *primitive.SendResult, e error) {
			if e != nil {
				ck_log.LogCtx(ctx).Warnw("receive message error", zap.Error(e))
			}
		}, msg)

	if err != nil {
		fmt.Printf("send message error: %s\n", err)
	}
}
